import com.gfscold.trans.common.app.BaseSQLApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DWDTransOrderDevice extends BaseSQLApp {
    @Override
    public void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId) {
        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transportation_order_device (\n" +
                "    `id` INT NOT NULL,\n" +
                "    `transportation_order_number` STRING COMMENT '运单号',\n" +
                "    `platform_code` STRING COMMENT '平台代码',\n" +
                "    `platform_name` STRING COMMENT '平台名称',\n" +
                "    `device_code` STRING COMMENT '设备编号',\n" +
                "    `device_name` STRING COMMENT '设备名称',\n" +
                "    `type` INT NOT NULL COMMENT '类型（1 固定式设备 2 移动式设备）',\n" +
                "    `index_num` INT NOT NULL COMMENT '序号',\n" +
                "    `cre_time` TIMESTAMP COMMENT '数据创建时间',\n" +
                "    `cre_name` STRING COMMENT '数据创建人',\n" +
                "    `mod_time` TIMESTAMP COMMENT '数据修改时间',\n" +
                "    `mod_name` STRING COMMENT '数据修改人',\n" +
                "    PRIMARY KEY ( `id` ) NOT ENFORCED\n" +
                ") WITH (\n" +
                "     'connector' = 'mysql-cdc'\n" +
                "    ,'hostname' = 'heukrftr7x7cmlu6no4g.rwlb.rds.aliyuncs.com'\n" +
                "    ,'port' = '3306'\n" +
                "    ,'username' = 'zhhuang4'\n" +
                "    ,'password' = 'HZfR1cqrRcZC^'\n" +
                "    ,'server-time-zone' = 'Asia/Shanghai'\n" +
                "    ,'scan.incremental.snapshot.enabled' = 'true'\n" +
                "    ,'debezium.snapshot.mode'='initial'  \n" +
                "    ,'database-name' = 'gfs_tms'\n" +
                "    ,'table-name' = 'transportation_order_device')")
                ;
        tableEnv.executeSql("SELECT\n" +
                "    transportation_order_number\n" +
                "    ,device_code\n" +
                "    ,REPLACE(device_name ,'\\n' ,'' )\n" +
                "    ,platform_code\n" +
                "    ,platform_name\n" +
                "    ,type\n" +
                "    ,index_num\n" +
                "    ,cre_time\n" +
                "    ,cre_name\n" +
                "    ,mod_time\n" +
                "    ,mod_name\n" +
                "FROM transportation_order_device").print();
    }

    public static void main(String[] args) {
        new DWDTransOrderDevice().start(9999, 1, "dwd_trans_order_device");
    }
}
